indexed nearest-by join #1
Draft
sezruby wants to merge 10 commits into
Draft
Conversation
Introduce the per-task vector-search primitive and its supporting types.
`LanceProbe` opens a Lance dataset once and drives `nearest()` + row-id-based
materialize calls against it. Unit-tested with recall=1.0 against brute-force
oracles on both uniform-random and clustered-embedding fixtures.
New artifacts:
- `LanceProbe` — open-once, probe-many wrapper around Lance's Java API.
- `Metric` — L2 / Cosine / Dot enum, `smallerIsBetter` flag threaded
through to ordering logic.
- `ScoredRowRef` — (rowId, score) pair crossing the inter-stage boundary.
- `LanceProbeValidationTest` + `LanceVectorIndexBuilder` test helper.
- New `lance-spark-knn_2.12` / `_2.13` modules in the reactor.
No functional impact on existing modules; `lance-spark-knn` is an additive
module reachable only through its own API.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…opKHeap
Build the three-stage kNN-join pipeline on top of LanceProbe:
- LanceProbeStage — per-task nearest-search emitting (leftId, ProbedLeft).
- LanceMergeStage — per-partition bounded-heap merge of contributions
per leftId, trimming to finalK.
- LanceMaterializeStage — point-fetch right rows by _rowid, assemble
final join Rows.
Plus the TopKHeap primitive (metric-aware bounded heap for the merge-side
aggregation) and the public entry point `IndexedNearestJoin.apply(left,
rightLanceUri, leftVecCol, rightVecCol, k, metric, scoreCol)`.
End-to-end tested: `IndexedNearestJoinCorrectnessTest` verifies recall=1.0
vs. an in-memory brute-force oracle at 1K × 100 × dim=16. `IndexedNearestJoinTest`
covers the public-API surface (left-outer join, custom score column,
projection list, refine factor).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…elism Opt-in `probeParallelism: Int = 1` parameter on `IndexedNearestJoin.apply`. When set > 1, the driver enumerates Lance fragments via `Dataset.getFragments()`, groups them (round-robin or LPT bin-packing when `balanceFragmentsByRowCount = true`), and replicates each left row across the groups so N parallel tasks each probe a disjoint fragment subset. Downstream merge aggregates the N contributions per leftId. The bandwidth win the staged design promises only lands here — Phase 0/1 had the shape but a single contributor per leftId (degenerate merge). Phase 1.5 makes the merge stage do real work across tasks. Edge case: when `probeParallelism > numFragments`, only one group has fragments and the rule degenerates gracefully back to the single-task path, avoiding a replicate shuffle for nothing. Oracle-verified for G=4 and G=8 (with and without skew-balanced grouping) against brute force. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… shuffle
Surface the staged pipeline as explicit Spark operators so `df.explain()`
shows the shape and Catalyst/AQE can engage on the merge shuffle:
LanceProbeExec
-> ShuffleExchangeExec hashpartitioning(_leftId) <- Catalyst inserts this
-> LanceMergeExec <- via EnsureRequirements
-> LanceMaterializeExec <- from
requiredChildDistribution
= ClusteredDistribution(_leftId)
on LanceMergeExec
Wrapped by AdaptiveSparkPlanExec. With AQE on, `CoalesceShufflePartitions`
/ `OptimizeSkewJoin` / `OptimizeShuffleWithLocalRead` all engage on the
merge shuffle (visibly `AQEShuffleRead coalesced` in the executed plan).
ColumnPruning subtlety: `LanceMergeLogicalPlan` and
`LanceMaterializeLogicalPlan` override `lazy val references =
child.outputSet`. Without this override, Catalyst's ColumnPruning rule
inserts `Project(Nil)` between custom nodes when downstream consumers
reference no columns (count(*), agg, select(lit(1))); `ProjectExec(Nil)`
codegens to 0-field UnsafeRows which crash `ProbedLeftCodec.Decoder` at
`ir.getLong(0)` — AssertionError under interpreter, SIGSEGV under C2 JIT.
The override makes the custom nodes declare all child outputs load-bearing,
short-circuiting ColumnPruning's subset guard.
Inter-stage row format: `ProbedLeftCodec` uses a flat schema (leftId + left
columns inlined + refs array-of-struct) rather than nested struct — earlier
multi-pass / nested-struct codec attempts had binary-layout issues at
benchmark scale.
`LanceKnnDatasetBridge` in `org.apache.spark.sql` is a trampoline to the
package-private `Dataset.ofRows`, locating it via reflection: Spark 3.x
exposes it on `org.apache.spark.sql.Dataset`; Spark 4.0 moved the concrete
implementation to `org.apache.spark.sql.classic.Dataset`. The bridge tries
both at startup and caches the winner, so the knn module compiles + runs
against Spark 3.5, 4.0, 4.1, and 4.2-SNAPSHOT from a single source.
Five test suites pin the behavior: AQE visibility, plan shape, consumer
shape (the crash-shapes from the ColumnPruning investigation), JIT stress,
structural pin on the references override, plus the two isolation tests
from the post-mortem kept as regression coverage.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
User-facing extension over `DataFrame` that mirrors `df.join(other, ...)` and wraps `IndexedNearestJoin.apply` with right-side URI auto-extraction from the analyzed plan. import org.lance.spark.knn.LanceKnnImplicits._ leftDf.kNearestJoin(rightDf, leftVecCol = "v", rightVecCol = "v", k = 10) Non-Lance right sides (parquet, in-memory, alias-wrapped non-Lance) fail fast with IllegalArgumentException naming the constraint. Works on Spark 3.5 / 4.0 / 4.1 / 4.2+ via the reflection-based Dataset.ofRows lookup in LanceKnnDatasetBridge (introduced in the preceding 3-exec commit). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…PQ recall Four substantive additions: 1. refineFactor / ef parameters on IndexedNearestJoin.apply, plumbed through LanceProbeStage.Conf to Query.Builder (setRefineFactor / setEf). IVF-PQ recall knob (fetches k*refineFactor PQ candidates, re-ranks with exact distance) and HNSW search-depth knob respectively. Defaults preserve current behavior. 2. balanceFragmentsByRowCount flag — LPT greedy bin-packing (4/3-optimal makespan approximation) on FragmentMetadata.getNumRows, used instead of round-robin when the fragment-row-count distribution is skewed. 3. Prefilter pushdown into the base module. Extends LanceFragmentScanner to carry a user-supplied SQL filter string, and LanceSparkReadOptions to serialize it. IndexedNearestJoin uses this to push right-side WHERE clauses into Lance's index-lookup path (prefilter = true is always set), so top-K is computed over only matching rows — correctness, not just perf: without prefilter, an indexed probe could return K rows all later filtered out, masking truly-nearest-but-also-matching rows further down the index. 4. Switched the whole pipeline from _rowaddr to _rowid. Lance's indexed nearest-search materializes _rowid but not _rowaddr; using _rowid on both probe + materialize paths makes it work for indexed AND non-indexed scans uniformly. IndexedNearestJoinIvfPqRecallTest builds a real IVF-PQ index via Lance's Java API and measures recall@K: 0.73 at defaults, 1.00 with refineFactor=8 (exact-distance re-rank recovers all true neighbors). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
New module `lance-spark-knn-4.2_2.13` adds a Catalyst postHocResolutionRule that intercepts Spark 4.2's NearestByJoin (SPARK-56395) over a Lance scan and emits the same 3-plan staged logical tree the DataFrame API path builds. Shared `LanceKnnStagedStrategy` lowers both paths to the identical LanceProbeExec -> ShuffleExchangeExec -> LanceMergeExec -> LanceMaterializeExec chain. Subtle: the rule MUST use `injectPostHocResolutionRule`, not `injectOptimizerRule`. Spark's built-in RewriteNearestByJoin runs in the optimizer's FinishAnalysis batch (the very first batch); rules added via injectOptimizerRule fire in operatorOptimizationBatch, which runs AFTER FinishAnalysis. By the time an injected optimizer rule fires, the NearestByJoin operator has already been rewritten to a cross-product + MaxMinByK plan — nothing left to pattern-match. Pattern match recognizes the three SPARK-56395 ranking expressions (VectorL2Distance + NearestByDistance, VectorCosineSimilarity + NearestBySimilarity, VectorInnerProduct + NearestBySimilarity) over a Lance DSv2 relation. Direction must match expression's natural ordering. Rule is opt-in via `spark.lance.knn.indexedNearestByJoin.enabled` (default false) until a cost-based gate lands in Phase 3.x. Prefilter pushdown: unwraps Filter(cond, lance) and Project(<passthrough>, Filter(...)), translates the predicate to Lance SQL (binary comparisons, IN, IS [NOT] NULL, AND/OR/NOT over right-side attrs vs literals). Anything else makes the rule REFUSE the rewrite (no partial pushdown — dropping a residual conjunct would silently change query semantics). Tests: IndexedNearestByJoinRuleTest covers the pattern-match positive + negative cases and pins the emitted 3-plan tree shape. IndexedNearestByJoinE2ETest drives a real Lance dataset end-to-end on Spark 4.2-SNAPSHOT, asserts all three execs + the Catalyst-inserted hashpartitioning(_leftId) exchange are in the physical plan, and matches top-K results against an in-memory brute-force oracle at dim=16 + dim=1024. Rule-off falls through to Spark's RewriteNearestByJoin and still matches the oracle — proves the opt-in gate doesn't break correctness. Schema note: `NearestByJoin.output` widens every left+right attribute to nullable=true (matching what Spark's default rewrite does via the First aggregate). The rule widens the materialize stage's internal finalSchema to match, keeping the ExpressionEncoder layout consistent with the declared output. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…here recall, SQL
Seven benchmarks validate correctness + perf + scaling on real + synthetic
data. All use `write.format("noop")` as the timing sink (Spark's canonical
benchmark pattern — materializes every row without a driver round-trip) and
gate correctness through a 16-row brute-force oracle before timed runs.
- IndexedNearestJoinBenchmark -- synthetic random, dim=128, 5 configs
(crossJoin baseline + 4 indexed variants)
- WikipediaKnnPerfBenchmark -- Cohere wikipedia-2023-11-embed-multilingual
parquet shards, dim=1024, real embeddings
- SiftRecallBenchmark -- canonical SIFT1M corpus, IVF-FLAT recall@10
- CohereWikiRecallBenchmark -- IVF-FLAT recall on Cohere wiki, dim=1024
- IndexedNearestJoinSoakTest -- concurrent sustained load (10-min smoke
window, 492 queries, driver heap
stability check)
- IndexedNearestByJoinSqlBenchmark (in lance-spark-knn-4.2_2.13)
-- SQL-path counterpart of the synthetic
benchmark; measures rule ON vs OFF
- InterStagePayloadOverheadBench (test-scope microbench)
-- encode-decode overhead of ProbedLeftCodec
at realistic row widths, measured <1% of
total wall-clock at every SQL benchmark
scale
Validation on a real OSS Spark 3.5 cluster (8 × 4c/16g executors, Kubernetes):
Cohere wiki dim=1024, |R|=1K × |L|=50 — indexed path is 100-200x faster
than crossJoin (7-iter median 160x; variance from multi-tenant CPU
contention, order-of-magnitude robust). SIFT1M IVF-FLAT recall@10 = 0.98 at
nprobes=16, 1.00 at nprobes=64 — within noise of published FAISS numbers.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… results
Seven reviewer-facing docs live next to the lance-spark-knn_2.12 sources:
- DESIGN.md -- end-to-end architecture, why no-index
Lance beats Spark cross-product
(SIMD / columnar / no-Catalyst
breakdown), Phase 0-3.x evolution.
- IMPL_PLAN.md -- original architecture sketch, phase
plan, Phase 3.x backlog, the 3-exec
staged split post-mortem (ColumnPruning
-> Project(Nil) -> 0-field UnsafeRow
-> SIGSEGV and how the references
override fixed it).
- PHASE_PROGRESS.md -- resume-without-context notes for
new-session reviewers.
- REVIEWER_GUIDE.md -- ~10-min reading order + file map +
test map + trust-but-verify checklist.
Start-here doc.
- UPSTREAM_DELIVERY_PLAN.md -- 7-PR split strategy for delivering
the feature to lance-format/lance-spark,
redundancy audit, explicit out-of-scope
items.
- BENCHMARK_RESULTS.md -- local M5 Max numbers + OSS Spark 3.5
cluster numbers with variance
envelope, per-iteration tables, and
reproduction instructions.
- NEARESTBYJOIN_ANN_PROPOSAL.md -- standalone proposal doc for sharing
with apache/spark maintainers on
SPARK-56395. Frames the PoC as "one
concrete implementation of the
indexed-path follow-up" with Lance
sidecar extension for parquet/delta
and five open questions.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1d6ec2c to
f875e88
Compare
…aling sweep
Two changes to land alongside the cluster scaling runs:
1. Replace `row_number window` headline baseline with `groupBy + sort_array(K)` (config A2).
The previous A baseline applies a `row_number().over(Window.partitionBy(lid))` over
the full cross-product; that requires a global per-lid sort with no partial
aggregation, runs hours at medium scale, and isn't what Spark 4.2's
RewriteNearestByJoin actually produces. A2 uses
`groupBy(lid).agg(slice(sort_array(collect_list(struct(dist, rid))), 1, K))` —
the closest Spark 3.5 SQL expression of 4.2's `min_by(struct, expr, K)`
(`MaxMinByK`, SPARK-55322). Spark applies partial aggregation per task so the
shuffle volume stays bounded. A is preserved as opt-in via
`BENCHMARK_INCLUDE_BASELINE_A=true`.
2. Add baseline-sweep + medium_l100 ground-truth scales for cross-cluster
scaling characterization. Sample |R|={10K,50K,100K,200K} at fixed |L|=1000,
plus one |R|=1M, |L|=100 ground truth (10x reduced |L|). Cross-product cost is
linear in both |L| and |R|, so this combination lets us extrapolate full medium
(|R|=1M, |L|=1K = 1B pairs) cheaply (~30 min cluster total) while validating the
linearity assumption against an independent ground-truth measurement.
Two cluster knobs surfaced from the runs:
- BENCH_DISABLE_AQE=true: AQE's CoalesceShufflePartitions throttles parallelism
on small post-shuffle data (collapses 128 partitions to ~8), capping the
cross-join compute stage at 8 parallel tasks regardless of cluster cores.
Off for baseline runs; indexed runs benefit from AQE on the merge shuffle.
- BENCH_BASELINE_RIGHT_PARTITIONS=N: repartition right side post-Lance-read so
the fused cross-join compute stage gets enough tasks to use all cores.
Default 64; matches an 8x8c cluster.
Doc update: BENCHMARK_RESULTS.md now has a "Synthetic benchmark" section with
the full cross-cluster sweep, big-vs-small comparison, and an honest variance
disclosure (multi-tenant ~20% noise envelope; noisy-neighbor pods that can
make one executor 2-3x slower across a whole run; executor-death retry
inflation). Includes setup instructions for first-time reviewers and
methodology callouts (oracle gating, noop sink, AQE rationale, A2 vs 4.2-native).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Indexed approximate-nearest-neighbor join over Lance's vector indexes for Apache Spark. Adds an idiomatic DataFrame API plus a Catalyst integration that intercepts Spark 4.2's
NearestByJoinoperator and routes it to a per-fragment Lance probe + Spark-side merge instead of the defaultO(|L|×|R|)cross-product rewrite.👉 Reviewer guide + upstream delivery plan
This branch is ~13 K LoC and not ideal for single-pass review. Before diving in:
REVIEWER_GUIDE.md— reading order, file map, trust-but-verify checklist. Reads in ~10 minutes. Start here.UPSTREAM_DELIVERY_PLAN.md— how this branch will be split into 7 smaller PRs forlance-format/lance-spark:main. Documents redundancy to clean up and explicit out-of-scope items (benchmarks, the Spark 4.2 module, deployment-specific build config).Most reviewers will want
REVIEWER_GUIDE.md§ "Start here" (3 files, 10 min) → § "Next: read the engine" (4staged/files, 30 min) → skim the test map. That's enough context to form an opinion on the design.👉 For apache/spark maintainers on SPARK-56395
NEARESTBYJOIN_ANN_PROPOSAL.mdframes this PoC as "one concrete implementation of the indexed path SPARK-56395 mentions as future work." Doesn't propose any apache/spark changes; uses only the extension points SPARK-56395 + existinginjectPostHocResolutionRulealready provide. Includes a section on extending to parquet/delta via Lance sidecar pattern with honest assessment of where the random-access cost limits the sidecar approach, plus five open questions about small apache/spark additions that would help downstream implementers.Commits
Organized as 9 feature-boundary commits matching the upstream delivery plan:
feat(knn): Phase 0 foundation — LanceProbe primitive + metric typesfeat(knn): staged RDD pipeline + IndexedNearestJoin.apply + bounded TopKHeapfeat(knn): Phase 1.5 — fragment-grouped probing for multi-task parallelismfeat(knn): 3-exec Catalyst-visible staged plan with AQE-visible merge shufflefeat(knn): df.kNearestJoin DataFrame extension methodfeat(knn): Phase 3 hardening — refineFactor, prefilter pushdown, IVF-PQ recallfeat(knn): Spark 4.2 SQL integration — IndexedNearestByJoinRuletest(knn-bench): benchmark suite — synthetic, Wikipedia perf, SIFT/Cohere recall, SQLdocs(knn): design, impl plan, reviewer guide, ANN proposal, benchmark resultsEach commit bodies carries the "why" for that feature — see the log.
Headlines
Correctness: 60 tests in
lance-spark-knn_2.12+ 17 tests inlance-spark-knn-4.2_2.13. Every benchmark gates through a 16-row brute-force oracle before quoting numbers. IVF-PQ recall@10 = 1.00 withrefineFactor=8; SIFT1M IVF-FLAT recall@10 = 0.98 at nprobes=16, 1.00 at nprobes=64 (within noise of published FAISS numbers).Perf (OSS Spark 3.5 cluster, 8 × 4c/16g executors on Kubernetes): Cohere
wikipedia-2023-11-embed-multilingual-v3at dim=1024, |R|=1K × |L|=50 — indexed path is 100–200× faster than Spark's cross-product baseline (7-iter median 160×; variance from multi-tenant CPU contention, order-of-magnitude is robust). Measured withwrite.format("noop")timing sink and oracle-gated correctness. Baseline is tight at ±2% (~65s); indexed path lands at 400–500ms.Perf (Apple M5 Max, synthetic dim=128):
crossJoin + array_distance UDF + row_number windowRewriteNearestByJoincross-product +min_by_k)Full results + methodology in
BENCHMARK_RESULTS.md.Public API
Architecture
Both the DataFrame API (
IndexedNearestJoin.apply→LanceKnnDatasetBridge) and the SQL path (IndexedNearestByJoinRulein Spark 4.2) emit this same 3-plan logical tree and shareLanceKnnStagedStrategyfor the physical lowering.df.explain()shows four Catalyst nodes (LanceProbe → Exchange → LanceMerge → LanceMaterialize) wrapped byAdaptiveSparkPlanExec. With AQE enabled,AQEShuffleRead coalescedappears on the merge-side shuffle after the first collection.Key subtlety —
injectPostHocResolutionRule, NOTinjectOptimizerRule. Spark's built-inRewriteNearestByJoinruns in the optimizer'sFinishAnalysisbatch (the very first batch); rules added viainjectOptimizerRulefire inoperatorOptimizationBatchwhich runs AFTERFinishAnalysis. By the time an injected optimizer rule fires, theNearestByJoinoperator has already been rewritten to a cross-product, so the SQL integration usesinjectPostHocResolutionRule— the only injection point that sees the unrewritten operator. Documented in the rule's scaladoc and inDESIGN.md§ "WhyinjectPostHocResolutionRule".ColumnPruning subtlety.
LanceMergeLogicalPlanandLanceMaterializeLogicalPlanoverridelazy val references = child.outputSet. Without this override, Catalyst'sColumnPruningrule insertsProject(Nil)wrappers between custom nodes when downstream consumers (count(*),Aggregate) reference no columns;ProjectExec(Nil)codegens to 0-fieldUnsafeRows, andProbedLeftCodec.Decodercrashes readingir.getLong(0)— AssertionError under interpreter, SIGSEGV under C2 JIT. SeeIMPL_PLAN.md"3-exec staged split — root cause and fix" for the full investigation + isolation walk. Structurally pinned byStagedPlansReferencesTest.Cross-version
Single source compiles + tests pass against Spark 3.5 AND 4.0 on the
_2.13module.LanceKnnDatasetBridgeuses a reflection-basedDataset.ofRowslookup that tries bothorg.apache.spark.sql.Dataset(3.x) andorg.apache.spark.sql.classic.Dataset(4.0+), caching the winner per-session. Full CI matrix (3.4 / 3.5 / 4.0 / 4.1) still TODO; seeIMPL_PLAN.md"Cross-version DataFrame API parity".Test coverage
ShuffleExchangeExec hashpartitioning(_leftId)present underAdaptiveSparkPlanExec).count(),agg(count("*")),select(lit(1)),collect()— the crash-shapes from the ColumnPruning investigation).refineFactor.All details in
REVIEWER_GUIDE.md§ "Test map" and § "Trust-but-verify checklist".Out of scope / follow-up
Per
UPSTREAM_DELIVERY_PLAN.md:spark.lance.knn.indexedNearestByJoin.enabledflag.runWithFragmentGroups's internalpartitionByremains RDD-level; the merge-side shuffle IS AQE-visible).LanceProbecache to amortize dataset open across small partitions.df.kNearestJoin.Documentation
REVIEWER_GUIDE.md— reading order, file map, "where to check first", trust-but-verify checklist, test map. For reviewers: start here.UPSTREAM_DELIVERY_PLAN.md— 7-PR split strategy for delivering tolance-format/lance-spark:main, redundancy cleanup list, out-of-scope items.NEARESTBYJOIN_ANN_PROPOSAL.md— SPARK-56395-specific framing for apache/spark maintainers; sidecar pattern for parquet/delta + five open questions.DESIGN.md— overall feature design, architecture with the 4-node physical plan, the "why no-index Lance still beats Spark cross-product" SIMD/columnar/no-Catalyst breakdown, and benchmark validation methodology.IMPL_PLAN.md— architecture sketch + phase plan + Phase 3.x backlog table + "3-exec staged split — root cause and fix" post-mortem section.PHASE_PROGRESS.md— resume-without-context notes.BENCHMARK_RESULTS.md— M5 Max local numbers plus OSS Spark 3.5 cluster numbers (100–200× speedup on Cohere wiki dim=1024, SIFT1M recall, sustained-load soak).🤖 Generated with Claude Code